package org.voovan.korla.socket;

import org.voovan.Global;
import org.voovan.korla.exception.KorlaException;
import org.voovan.korla.message.Msg;
import org.voovan.korla.message.MsgRegister;
import org.voovan.korla.message.SyncCallback;
import org.voovan.network.EventProcess;
import org.voovan.network.SocketContext;
import org.voovan.network.exception.SendMessageException;
import org.voovan.network.filter.ByteFilter;
import org.voovan.network.messagesplitter.ByteMessageSplitter;
import org.voovan.network.tcp.TcpSocket;
import org.voovan.tools.TEnv;
import org.voovan.tools.TPerformance;
import org.voovan.tools.log.Logger;
import org.voovan.tools.pool.PooledObject;
import org.voovan.tools.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import static org.voovan.korla.KorlaStatic.*;
import static org.voovan.tools.TObject.nullDefault;

/**
 * Korla 消费者
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlarConsumer extends PooledObject {
    //使用一半的 cpu 数做延迟发送
    private static int THREAD_POOL_SIZE = TPerformance.getProcessorCount()/2;
    private static ThreadPoolExecutor FLUSH_THREAD_POOL = ThreadPool.createThreadPool("Korla_Flush", THREAD_POOL_SIZE, THREAD_POOL_SIZE, 60000);
    private static DelayQueue<FlushTask> FLUSH_QUEUE = new DelayQueue<FlushTask>();
    private static Thread FLUSH_THREAD;
    static {
        ThreadPoolExecutor threadPoolExecutor = Global.getThreadPool();
        if(threadPoolExecutor.getCorePoolSize()<=0 || threadPoolExecutor.getMaximumPoolSize()<=0) {
            Logger.error("Globle thread pool not avaliable, please modify config parameter [ThreadPoolMinSize, ThreadPoolMaxSize] in framework.properties ");
        }

        FLUSH_THREAD = new Thread(()->{
            while(true) {
                try {
                    FlushTask flushTask = FLUSH_QUEUE.take();
                    FLUSH_THREAD_POOL.execute(()->{
                        flushTask.run();
                    });
                } catch (Exception e) {
                    Logger.error("Time flush error", e);
                }
            }
        }, "Korla_Flush_Manager");
        FLUSH_THREAD.start();
    }

    private String host;
    private int port;
    private int readTimeout;
    private int sendTimeout;
    private int poolSize;
    private String channel;
    private transient TcpSocket tcpSocket;
    private transient SocketHandler socketHandler = new SocketHandler();

    private int batchSize = -1;
    private int batchInterval = -1;
    private AtomicInteger batchCount = new AtomicInteger();
    private FlushTask flushTask;

    /**
     * @param channel      通道名称
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @throws IOException	IO异常
     */
    public KorlarConsumer(String channel, String host, int port, int readTimeout, int sendTimeout) throws IOException {
        this.host = host;
        this.port = port;
        this.readTimeout = readTimeout;
        this.sendTimeout = sendTimeout;

        this.channel = nullDefault(channel, DEFAULT_CHANNEL);
        tcpSocket = new TcpSocket(host, port, readTimeout, sendTimeout, 1);
        tcpSocket.messageSplitter(new ByteMessageSplitter());

        tcpSocket.filterChain().add(new ByteFilter());
        tcpSocket.filterChain().add(new KorlaFilter());
        tcpSocket.handler(socketHandler);
    }

    /**
     *
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @throws IOException	IO异常
     */
    public KorlarConsumer(String host, int port, int readTimeout, int sendTimeout) throws IOException {
       this(null, host, port, readTimeout, sendTimeout);
    }

    protected TcpSocket getTcpSocket() {
        return tcpSocket;
    }

    public int getBatchSize() {
        return batchSize;
    }

    public int getBatchInterval() {
        return batchInterval;
    }

    public int getBatchCount() {
        return batchCount.get();
    }

    /**
     * 连接服务
     * @throws IOException IO 异常
     */
    public void connect() throws IOException {
        tcpSocket.syncStart();

        //向服务端注册
        send(new MsgRegister(this.channel));

        TEnv.wait(tcpSocket.getReadTimeout(), ()->!tcpSocket.getSession().containAttribute(CONNECT_STR));
    }

    /**
     * 发送消息
     * @param msg 发送的对象
     * @return true: 发送成功, false: 发送失败
     */
    public void send(Msg msg) {
        try {
            //如果没有回调,设置为默认的同步回调器
            if(msg.getCallBack() == null) {
                msg.setCallBack(new SyncCallback(msg));
            }

            baseSend(msg);
        } catch (Exception e) {
            throw new KorlaException("Client send failed", e);
        }
    }

    /**
     * 发送并获取消息
     * @param msg 发送的对象
     * @return true: 发送成功, false: 发送失败
     */
    public <T extends Msg> T send(Msg msg, long timeoutMillis) {
        try {
            //如果没有回调,设置为默认的同步回调器
            if(msg.getCallBack() == null) {
                msg.setCallBack(new SyncCallback(msg));
            }

            baseSend(msg);
            return (T) msg.getResponse(timeoutMillis);
        } catch (Exception e) {
            throw new KorlaException("Client send failed", e);
        }
    }

    public void batch(int batchSize, int batchInterval){
        this.batchSize = batchSize;
        this.batchInterval = batchInterval;
    }

    private void buildFlushTask() {
        if(batchSize<=0 || batchInterval <=0) {
            return;
        }

        if(flushTask == null) {
            this.batchSize = batchSize;
            this.batchInterval = batchInterval;
            final KorlarConsumer finalKorlarConsumer = this;

            flushTask = new FlushTask(batchInterval, ()->{
                if (finalKorlarConsumer.isConnect()) {
                    finalKorlarConsumer.flush();
                }
            });
            FLUSH_QUEUE.add(flushTask);
        }
    }

    private void baseSend(Msg msg) throws SendMessageException {
        if(batchSize > 0 && batchInterval >0) {
            EventProcess.sendMessage(tcpSocket.getSession(), msg);

            buildFlushTask();

            if(batchCount.incrementAndGet()>=batchSize){
                flush();
            }
        } else {
            tcpSocket.syncSend(msg);
        }
    }

    public synchronized void flush() {
        flushTask.setAvaliable(false);
        flushTask = null;
        tcpSocket.getSession().flush();
        batchCount.set(0);
    }

    /**
     * 消息重试
     * @param msg 被重试的消息
     * @return 相应消息
     */
    public Msg retry(Msg msg) {
        Msg result = msg.getCallBack().execute(msg);
        return result;
    }

    /**
     * 是否处于连接状态
     * @return true: 连接中, false: 未连接
     */
    public boolean isConnect(){
        return tcpSocket.isConnected();
    }

    /**
     * 关闭连接
     */
    public void close(){
        if(flushTask!=null) {
            FLUSH_QUEUE.remove(flushTask);
        }
        tcpSocket.close();
    }

}
